import { interval, Subject } from "rxjs";
// takeUntil是RxJS中的一个变换操作符，它会取消订阅源 Observable，并停止发出值，直到另一个 Observable 发出值
import { takeUntil } from "rxjs/operators";

const source$ = interval(1000);
const stop$ = new Subject();
//source$会不断的产出值，直到stop$产出值为止
source$.pipe(takeUntil(stop$)).subscribe(console.log);

setTimeout(() => {
  // stop$产出， source就不再执行
  stop$.next(100);
}, 3000);
